[core] Move stream reconnect logic to getReadable level#1847
Conversation
Signed-off-by: Peter Wielander <mittgfu@gmail.com>
🦋 Changeset detectedLatest commit: 4a0258b The changes in this PR will be included in the next version bump. This PR includes changesets to release 17 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
🧪 E2E Test Results❌ Some tests failed Summary
❌ Failed Tests▲ Vercel Production (1 failed)vite (1 failed):
🌍 Community Worlds (92 failed)mongodb (14 failed):
redis (10 failed):
turso (68 failed):
Details by Category❌ ▲ Vercel Production
✅ 💻 Local Development
✅ 📦 Local Production
✅ 🐘 Local Postgres
✅ 🪟 Windows
❌ 🌍 Community Worlds
✅ 📋 Other
❌ Some E2E test jobs failed:
Check the workflow run for details.
Check the workflow run for details. |
Signed-off-by: Peter Wielander <mittgfu@gmail.com>
TooTallNate
left a comment
There was a problem hiding this comment.
Review
The architectural shift makes sense: client-side frame counting is a cleaner abstraction than wire-level control frames, and moving it to core means it works for any world that returns a ReadableStream from readFromStream, not just world-vercel. The reconnect math, frame-counting, and partial-frame discard are all correct.
But there are two significant concerns I think need addressing before merge.
1. Byte streams lose auto-reconnect entirely
The PR explicitly opts byte streams out of reconnect:
if (value.type === 'bytes') {
// No auto-reconnect here: raw byte streams have no wire framing
const readable = new WorkflowServerReadableStream(value.name, value.startIndex);
// ...
} else {
const readable = createReconnectingFramedStream(value.name, value.startIndex);
// ...
}The reason given is technically correct (no wire framing → no chunk boundary detection client-side), but this is a regression vs. the reverted #1790, which handled byte streams just fine because the server sent the resume hint via control frame.
The use cases that lose auto-reconnect:
- AI streaming responses (text/SSE) piped from
getWritable() - Any HTTP route doing
return new Response(run.getReadable())for raw bytes - Any streaming workflow output that goes more than 2 minutes (the prior server-side timeout window) and uses byte type
The docs callout added by this PR points users to WorkflowChatTransport and supportsCancellation, but those address a different problem (cancellation, not reconnect). Pushing reconnect to the application layer — where every consumer has to reimplement it — is a step backward in usability.
Possible directions:
- Frame byte streams on the writable side too (4 bytes per chunk overhead) so
createReconnectingFramedStreamworks for them. The user-facing surface stays raw bytes; only the wire format changes. - Keep the control-frame approach for byte streams only as a hybrid — frame counting for non-byte streams, server-side hint for byte streams.
- Document this as an explicit limitation and update the docs callout to specifically warn about byte streams losing reconnect, not just talk about
supportsCancellation(separate issue).
(1) seems best to me — it removes the asymmetry entirely and keeps the cleaner architecture.
2. The "clean EOF means done" assumption needs verification
if (result.done || !result.value) {
// Clean EOF — stream is truly complete...
controller.close();
return;
}This assumes the workflow-server signals "done" and "timeout/aborted" differently at the network level — clean done = FIN, timeout = error/reset. The deleted control-frame logic disambiguated these because both manifested as clean closes from a TCP perspective; the magic-footer frame was the disambiguator.
Without the control frame, the new code can't tell them apart. If the workflow-server's 2-minute timeout sends a clean FIN (rather than a TCP reset or stream error), this PR will appear to "complete" any stream that hits 2 minutes.
Is that assumption verified against the actual server behavior? The new test simulates max-duration as controller.error(...), which is fine for the unit test, but I'd want to see either:
- An e2e test confirming a real long-lived stream against workflow-server triggers reconnect (not premature close)
- A statement in the PR description / commit explaining why the server-side timeout is now an error not a clean close (was the workflow-server changed? was the timeout removed?)
The supportsCancellation callout suggests the architecture has shifted such that streams now run for the full function maxDuration rather than the old 2-minute server timeout — but if so, that's a precondition for this PR and worth calling out explicitly.
Minor
See inline comments.
What looks good
- Frame-counting math is correct:
currentStartIndex += consumedFramesresumes at the right place, partial-frame buffer is correctly discarded, the math is symmetric for non-zero initialstartIndex. - Negative
startIndexcorrectly bypasses reconnect with a clear reason (can't compute absolute resume index without a tail-index lookup) — and there's a test for it. - AbortController plumbing in world-vercel
readFromStreamis the right primitive. Cancel propagation throughcancel(reason) { abortController.abort(reason) }correctly tears down the fetch. - Test coverage for
createReconnectingFramedStreamis good — frames split across reads, partial frame at error, clean EOF, non-zero initial startIndex, negative startIndex bypass, cancel propagation. Six tests, all targeted. - Two changesets correctly scoped:
@workflow/corefor the new wrapper,@workflow/world-vercelfor the cancel propagation.
| value.name, | ||
| value.startIndex | ||
| ); | ||
| if (value.type === 'bytes') { |
There was a problem hiding this comment.
Byte streams are intentionally opted out of auto-reconnect here. This is a behavioral regression vs. the reverted #1790, which handled byte streams via server-sent control frames.
The comment correctly identifies why this is hard (no wire framing → no chunk boundary detection client-side), but pushing reconnect to the application layer means:
- Every consumer of
run.getReadable()for byte streams (AI text streaming, raw HTTP responses, etc.) has to implement its own reconnect logic. - The docs callout added by this PR (about
supportsCancellation) doesn't actually help — that's a cancellation fix, not a reconnect fix.
I think the right move is to frame byte streams on the writable side too (4 bytes per chunk overhead), so createReconnectingFramedStream can be used uniformly. The user-facing API stays raw bytes; only the wire format gets the length prefix. That removes the asymmetry and keeps the cleaner architecture this PR is trying to achieve.
| * the writable buffers one frame per chunk when multi-writing). The wrapper | ||
| * counts completed frames and, on upstream error, reopens the connection | ||
| * with `startIndex = resolvedStartIndex + consumedFrames`. Partial-frame | ||
| * bytes buffered before the cut are discarded — the server will resend the |
There was a problem hiding this comment.
Comment says On serverfull backends, reconnects should only happen during transient errors. For serverless backends, we set this constant so that we cover at least 10 minutes even if the server would be limited to e.g. 1 minute per session.
10 reconnects \u00d7 1-minute-per-session = 10 minutes covered. That's tighter than the deleted constant in world-vercel (MAX_RECONNECTS = 50, ~100 minutes coverage at 2-min server timeouts). If the underlying assumption is that streams now run for full function maxDuration (which on Pro/Enterprise can exceed 10 minutes), this cap may be too low.
Worth either:
- Bumping the constant to match the longest realistic
maxDuration(~15 min Pro), so something like 30, or - Making it configurable per-call (or via the world)
| console.warn("Error closing ReadableStream reader:", err) | ||
| }); | ||
| reader = undefined; | ||
| } |
There was a problem hiding this comment.
Nit: cancel() here only cancels the active reader. There's a small race window: if cancel fires while connect() is in flight (between reader = undefined after a reconnect-triggering error and the new reader being assigned), there's nothing to cancel — the new connection completes and the loop continues reading.
A cancelled flag checked at the top of the pull loop and inside connect() would close this. Same race existed in the deleted world-vercel cancel handler, so it's not a regression — just worth tightening if you're touching this code.
let cancelled = false;
// ... in pull loop, top of for(;;):
if (cancelled) { controller.close(); return; }
// ... in cancel:
cancelled = true;| const { world } = makeWorldWithScriptedStreams({ | ||
| 0: () => | ||
| scriptedStream([ | ||
| // Split frame into 3 byte-level reads to prove boundary-aware |
There was a problem hiding this comment.
Test simulates max-duration abort as controller.error(...) — which is correct for what the wrapper sees on a network reset, but doesn't verify the actual workflow-server behavior matches.
If workflow-server's stream timeout sends a clean FIN (i.e., calls controller.close() on its end) instead of an error, this code path will treat it as EOF and not reconnect. The control-frame logic that this PR removes was specifically designed to disambiguate these two cases.
Could you confirm in the PR description whether:
- workflow-server's stream timeout has been removed entirely (streams now run for full function
maxDuration), OR - the timeout still exists but now manifests as a network error / TCP reset rather than a clean FIN?
This is the load-bearing assumption of the whole design.
|
Following up after the discussion thread — consolidating the recommended direction so it's all in one place. Recommended directionMove byte-stream framing into core, gated on a per-run feature flag, with the resolved choice baked into the serialized stream ref. The PR's instinct (move reconnect to core) is right. The concrete change to make it work uniformly for byte streams: 1. Frame byte streams on the writer sideIn ops.push(value.pipeTo(writable));It would become: ops.push(
value
.pipeThrough(getByteFramingStream()) // wrap each chunk in [4-byte len][bytes]
.pipeTo(writable)
);Cost: 4 bytes per server-side chunk. For typical streaming workloads (AI text chunks of dozens of bytes, structured byte payloads in the KB+ range) this is well under 5% overhead. 2. Use
|
TooTallNate
left a comment
There was a problem hiding this comment.
Approving — withdrawing my prior request-for-changes.
Context: my earlier blocker was that this PR opts byte streams out of auto-reconnect, which I called a regression vs. the now-reverted #1790. Since then we discussed it and settled on a different plan: this PR lands on stable as-is (object-stream reconnect only), and byte-stream support gets added on main/v5 via wire-level framing in a follow-up. The framing work is now in PRs #1854 (workflowCoreVersion on HealthCheckResult) and #1853 (the framing itself), which together let createReconnectingFramedStream be applied uniformly to byte streams on main once they land.
So for stable, this PR is the right scope:
- Object-stream reconnect via
createReconnectingFramedStreamis correct. - Byte streams legitimately can't be auto-reconnected with the legacy unframed wire format that
stableships, so opting them out is the right call there. - Frame-counting math, AbortController plumbing, world-vercel simplification all look good.
The earlier non-blocking concerns I raised still apply — would be nice to address them but I'm not gating on them:
- The "clean EOF means done" assumption. Worth a sentence in the commit/PR description confirming whether workflow-server's stream timeout now manifests as a network error rather than a clean FIN, since the deleted control-frame logic was specifically there to disambiguate them.
FRAMED_STREAM_MAX_RECONNECTS = 10is tighter than the deletedMAX_RECONNECTS = 50. Probably fine, but worth a sanity check against the longest realistic Pro/EnterprisemaxDuration.- Cancel race during reconnect — pre-existing, not a regression here.
Co-authored-by: Peter Wielander <mittgfu@gmail.com> Signed-off-by: Peter Wielander <mittgfu@gmail.com>
…l-at-getreadable-level # Conflicts: # packages/world-vercel/src/streamer.test.ts # packages/world-vercel/src/streamer.ts
…preview - serialization: reset reconnectCount to 0 when a reconnect delivers a frame, so FRAMED_STREAM_MAX_RECONNECTS bounds *consecutive* failures (as documented) instead of the lifetime total. Long-lived serverless streams that reconnect repeatedly but keep delivering no longer get falsely capped. Export the constant for tests. - tests: add coverage for the max-consecutive-reconnect cap, the budget-reset-on-progress regression, and multi-frame-per-read drain. - world-vercel: temporarily point WORKFLOW_SERVER_URL_OVERRIDE at the peter-stream-timeout-error workflow-server preview so this branch's e2e exercises the matching server-side stream-timeout behavior. To be cleared before merge (see comment). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The consecutive-failure cap resets on forward progress, which is correct for a backend that honors startIndex. But a World whose readFromStream ignored startIndex and re-delivered earlier chunks would report progress on every reconnect, so the consecutive cap would never trip — an unbounded reconnect loop. Add FRAMED_STREAM_MAX_TOTAL_RECONNECTS (1000), a hard ceiling that never resets, so the loop always terminates while staying far above any legitimate long-lived stream. Add a test covering the pathological ignore-startIndex case. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
VaguelySerious
left a comment
There was a problem hiding this comment.
AI review: blocking issues found
| */ | ||
| const WORKFLOW_SERVER_URL_OVERRIDE = ''; | ||
| const WORKFLOW_SERVER_URL_OVERRIDE = | ||
| 'https://workflow-server-git-peter-stream-timeout-error.vercel.sh'; |
There was a problem hiding this comment.
AI Review: Blocking
The hard-coded server override must be cleared back to '' before this merges — as written it points every consumer's stream traffic at a preview deployment. You've already flagged it as temporary and the two red CI signals are by-design, so this is just the merge gate: don't land until this constant is reset and those checks go green.
| // fetch implementations differ on whether cancelling the body | ||
| // alone tears down the socket. | ||
| return new ReadableStream<Uint8Array>({ | ||
| start(controller) { |
There was a problem hiding this comment.
AI Review: Note
The rewrap pumps the upstream eagerly in start() — the loop calls reader.read() → controller.enqueue(value) with no backpressure check, so it drains the upstream as fast as the socket delivers regardless of how fast the downstream consumer reads. The previous code returned response.body directly, which propagates backpressure to the socket. For a fast producer + slow consumer (exactly the long-lived streaming case this path serves), the wrapper can buffer the whole stream in the controller queue → unbounded memory.
Consider a pull-based source instead of an eager start() pump (read one chunk per pull, enqueue, return), or gate the pump on controller.desiredSize. That keeps cancel→abort propagation while preserving backpressure.
| * hard ceiling guarantees the loop always terminates. It is set high enough | ||
| * (hours of streaming at realistic per-session timeouts) to never interfere | ||
| * with legitimate long-lived streams. | ||
| */ |
There was a problem hiding this comment.
AI Review: Note
With the server now aborting in-progress streams at max-duration (rather than closing cleanly), the negative-startIndex branch becomes a behavior change worth calling out: those reads opt out of reconnect, so a last-N consumer that previously saw a clean EOF at the duration limit will now surface the abort as a hard error. The object-stream consumers that use negative indices (e.g. tail-resolving clients) resolve to an absolute index before connecting, so in practice they shouldn't hit this — but a doc line or a comment noting "negative startIndex + mid-stream server abort = error, not silent close" would save a future debugging session.
|
(AI) Cross-PR context & merge order Together these make How the pieces fit:
Behavioural note: the reconnecting reader only reopens on a connection error — a clean close means "complete". So the client change is inert on its own and only takes effect once paired with the coordinated server-side change that ends a timed-out connection with an error rather than a silent close (handled separately). Shipping the client first is therefore safe and must precede that server change. Suggested order:
#1853 is independent and can land on its own schedule; it unblocks byte-stream reconnect as a future follow-up. |
TooTallNate
left a comment
There was a problem hiding this comment.
Approve — with one hard pre-merge gate (the URL override) and two non-blocking design notes
I built @workflow/core + @workflow/world-vercel from this branch and ran the new suites locally: all 10 reconnecting-framed-stream tests and all 20 streamer tests pass.
The design is right
Moving reconnect from the adapter's wire-sniffing control-frame approach (#1790, reverted here) up to the framing layer is the correct factoring. The framing layer is the one place that already knows chunk boundaries, so "count completed frames, resume at startIndex + consumed" needs no wire protocol additions at all — and it works identically for any World adapter, not just world-vercel.
Specifics I verified:
- Partial-frame discard on reconnect is correct: buffered mid-frame bytes are dropped,
currentStartIndex += consumedFrames, and the server resends the in-flight chunk in full. The test at line 140 simulates exactly the production scenario (3 bytes of a frame, then a max-duration abort) and asserts the resume index. - The two-tier budget is well-reasoned. Consecutive cap (50) resets on forward progress, so a long-lived stream that reconnects hundreds of times while still delivering is never falsely killed — tested with
FRAMED_STREAM_MAX_RECONNECTS + 5productive reconnects. The absolute backstop (1000) covers the pathological case where a backend ignoresstartIndexand reports false progress forever — also tested. Both constants exported and documented with the reasoning. - Clean EOF = completion, error = reconnect is the right contract, and negative
startIndex(last-N) correctly opts out since an absolute resume position can't be computed. - Frames pass through with headers intact to the downstream deserializer, which already expects the framed layout — the wrapper only counts, it doesn't re-frame. Nice and minimal.
- The byte-stream opt-out is correctly scoped (raw streams have no framing to count) and the doc callouts about
supportsCancellationfor long-lived stream routes are a genuinely useful addition independent of this change.
Hard gate before merge
WORKFLOW_SERVER_URL_OVERRIDE is pointed at a preview deployment. The in-code comment documents this as temporary and correctly predicts the red CI (the override lint guard + the 4 utils.test.ts override-precedence cases — I checked the failing unit job and those 4 are precisely the failures). This must go back to '' before merge, and CI needs one green re-run after the reset — the current Tests run is also three weeks old (May 29) relative to the branch head, and the MongoDB/Redis community-world results from that run are stale enough that I wouldn't sign off on them either way without a fresh run.
Non-blocking notes
-
A reconnect-time connection failure is fatal rather than budgeted. The retry budget only covers
reader.read()errors. Ifreconnect()→connect()itself throws (the reopen fetch fails transiently — plausible during exactly the kind of server blip that triggers reconnect in the first place), the catch inpullerrors the stream immediately with budget remaining. Folding connect failures into the same budgeted loop (count it, retry) would make the wrapper robust against the scenario it exists for. Fine as a follow-up. -
The streamer's cancel-propagation wrapper trades away backpressure. The eager
pump()loop inreadFromStreamreads upstream as fast as the network delivers andenqueues without consultingdesiredSize, so a slow consumer now buffers the stream in the controller queue instead of letting the socket backpressure naturally (the old code returnedresponse.bodydirectly, which is pull-driven). Apull-based wrapper would keep the AbortController plumbing and preserve backpressure. For typical workflow stream sizes this is unlikely to matter, but it's an unnecessary semantic change for what is otherwise just abort plumbing. -
Changeset bump types (
patchfor both packages on the GA channel) are defensible since this fixes silent truncation, even though it adds new behavior.
Once the override is reset and CI is green on a current run, this is good to land. The cross-PR sequencing in the description (this merging and releasing before the coordinated server-side behavior change takes effect) is the right order — until the server change ships, this code path simply never triggers, which makes it safe to release ahead.
| * | ||
| * NOTE (temporary): this is intentionally pointed at the | ||
| * `peter-stream-timeout-error` workflow-server preview so this branch's e2e | ||
| * tests exercise the matching server-side stream-timeout behavior. It will be | ||
| * cleared back to '' once those server-side changes merge — not a review | ||
| * concern. While it is set, two CI signals are red by design and will go | ||
| * green again on reset: the "WORKFLOW_SERVER_URL_OVERRIDE is empty" lint | ||
| * guard, and the override-precedence cases in `utils.test.ts` (the hardcoded | ||
| * value intentionally wins over the env var, which those cases assert is | ||
| * absent). | ||
| */ | ||
| const WORKFLOW_SERVER_URL_OVERRIDE = ''; | ||
| const WORKFLOW_SERVER_URL_OVERRIDE = | ||
| 'https://workflow-server-git-peter-stream-timeout-error.vercel.sh'; |
There was a problem hiding this comment.
| * | |
| * NOTE (temporary): this is intentionally pointed at the | |
| * `peter-stream-timeout-error` workflow-server preview so this branch's e2e | |
| * tests exercise the matching server-side stream-timeout behavior. It will be | |
| * cleared back to '' once those server-side changes merge — not a review | |
| * concern. While it is set, two CI signals are red by design and will go | |
| * green again on reset: the "WORKFLOW_SERVER_URL_OVERRIDE is empty" lint | |
| * guard, and the override-precedence cases in `utils.test.ts` (the hardcoded | |
| * value intentionally wins over the env var, which those cases assert is | |
| * absent). | |
| */ | |
| const WORKFLOW_SERVER_URL_OVERRIDE = ''; | |
| const WORKFLOW_SERVER_URL_OVERRIDE = | |
| 'https://workflow-server-git-peter-stream-timeout-error.vercel.sh'; | |
| */ | |
| const WORKFLOW_SERVER_URL_OVERRIDE = ''; |
Reverts the temporary preview override (and its NOTE) so utils.ts has no diff. The matching server-side stream-timeout behavior is validated via its own PR; the SDK override must stay empty (lint guard enforces it). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Moves stream reconnect handling out of the world-vercel adapter and up to the
getReadable/core level, where chunk framing already lives — so reconnect works the same way across world adapters.Reverts #1790 (the adapter-level control-frame approach). The reconnecting reader counts the 4-byte length-prefixed frames it has received and, on a connection error, reopens the stream from
startIndex + framesConsumed. A clean end-of-stream is treated as completion (no reconnect). Object/serialized streams only — raw byte streams have no wire framing to count and are opted out (the caller owns its own reconnect strategy). Bounded by a consecutive-failure cap (reset on forward progress) plus an absolute total-reconnect backstop.Closes #1801
Closes #1802
After shipping this
Forward-ported to
mainin #2318. See the cross-PR comment for merge order — this only takes effect once paired with the coordinated server-side change that errors a timed-out stream connection instead of closing it cleanly.